[spark] support distributed execution of vector search on spark#8108
[spark] support distributed execution of vector search on spark#8108Stefanietry wants to merge 1 commit into
Conversation
| Broadcast<RoaringNavigableMap64> preFilterBroadcast = | ||
| preFilter == null ? null : engineContext.broadcast(preFilter); | ||
|
|
||
| SerializableFunction<List<byte[]>, Optional<byte[]>> task = |
There was a problem hiding this comment.
This distributed path returns java.util.Optional<byte[]> from the Spark task and then collects it back to the driver. java.util.Optional is not Serializable in Java 8, so Spark will fail serializing the task result with NotSerializableException once this branch actually runs. Could we return a serializable value instead, for example byte[] with null meaning empty, or a small serializable wrapper?
There was a problem hiding this comment.
thanks for pointing it , only considered kryo before, and has fixed as suggested.
| assertThat(df.columns()).hasSize(4); | ||
| rows = df.collectAsList(); | ||
| assertThat(rows).hasSize(5); | ||
| spark.sql("set spark.paimon.vector-search.distribute.enabled = true;"); |
There was a problem hiding this comment.
This assertion does not seem to exercise the new Spark-distributed path: the table only has a small number of vector splits, while SparkVectorReadImpl falls back to super.read unless splits.size() >= global-index.thread-num * 2 (default 64 splits). Because of that, the serialization/distributed execution code can be broken and this test would still pass. Could we force the distributed branch in this test, for example by setting spark.paimon.global-index.thread-num=1 or by creating enough index shards/splits?
There was a problem hiding this comment.
thanks for pointing it , has fixed in latest version
| return dataOutputSerializer.getCopyOfBuffer(); | ||
| } | ||
|
|
||
| public ScoredGlobalIndexResult deserialize(byte[] data) throws IOException { |
There was a problem hiding this comment.
This helper cannot round-trip an empty ScoredGlobalIndexResult. serialize() writes only scoreSize=0 for scored results whose bitmap is empty, and the existing deserializer interprets scoreSize == 0 as a plain GlobalIndexResult; this deserialize(byte[]) method then fails the instanceof ScoredGlobalIndexResult check. In the distributed reader, a split group can legitimately produce an empty scored result when the scalar pre-filter excludes all rows in that group, so this can make filtered distributed searches fail even though the local path handles empty optionals. We probably need an explicit scored/non-scored marker in the serialization format, or avoid serializing empty scored results as successful task results.
There was a problem hiding this comment.
This method only considers the scenario where ScoredGlobalIndexResult is not null. For the null scenario, it directly returns null (in the previous version, it returned Optional.empty) and avoids serialization. Please refer to org.apache.paimon.spark.read.SparkVectorReadImpl#read for detailed information.
If you want to distinguish between GlobalIndexResult and ScoredGlobalIndexResult in org.apache.paimon.globalindex.GlobalIndexResultSerializer#deserialize(org.apache.paimon.io.DataInputView),
I'd be happy to create a separate issue to support this later; please see if that's feasible?
93353be to
e8d7694
Compare
ef31bad to
dbc62f3
Compare
8de0c54 to
1d7b368
Compare
|
|
||
| abstract boolean preserveOnDelete(); | ||
| /** SPI for engine specific {@link VectorSearchBuilder} creation. */ | ||
| public interface VectorSearchBuilderProvider { |
There was a problem hiding this comment.
Can you avoid introducing this interface? You can just modify PaimonBaseScan to custom vector search logical.
There was a problem hiding this comment.
like this ? val vectorSearchBuilder = if (CoreOptions.fromMap(table.options).vectorSearchDistributeEnabled()) { new SparkVectorSearchBuilderImpl(table) } else { table.newVectorSearchBuilder(); }
The previous implementation was planned to integrate the construction of VectorSearchBuilder into org.apache.paimon.table.InnerTable#newVectorSearchBuilder; If the aforementioned custom method in PaimonBaseScan is feasible, I have completed the modification latest version.
| import java.util.List; | ||
|
|
||
| /** Factory for {@link VectorSearchBuilder}. */ | ||
| public class VectorSearchBuilderFactory { |
There was a problem hiding this comment.
Can you avoid introducing this interface? You can just modify PaimonBaseScan to custom vector search logical.
There was a problem hiding this comment.
As mentioned above, it has been removed.
1d7b368 to
7994022
Compare
Purpose
Purpose: Currently, vector search operation is executed on a single node within the driver, which may lead to performance bottlenecks when dealing with large amounts of data. This issue aims to implement a distributed execution capability.
Linked issue: #8107
Tests
Add distributed vector search test via the parameter
vector-search.distribute.enabledon org.apache.paimon.spark.SparkMultimodalITCase#testVector